import asyncio
import inspect
import os
import platform
import signal
import socket
import sys
import time
import warnings
from contextlib import suppress
from ipaddress import ip_address
from pathlib import Path

import click
import httpx
import typer
from dotenv import load_dotenv
from fastapi import HTTPException
from httpx import HTTPError
from jose import JWTError
from lfx.log.logger import configure, logger
from lfx.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD
from multiprocess import cpu_count
from multiprocess.context import Process
from packaging import version as pkg_version
from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from sqlmodel import select

from langflow.cli.progress import create_langflow_progress
from langflow.initial_setup.setup import get_or_create_default_folder
from langflow.main import setup_app
from langflow.services.auth.utils import check_key, get_current_user_by_jwt
from langflow.services.deps import get_db_service, get_settings_service, is_settings_service_initialized, session_scope
from langflow.services.utils import initialize_services
from langflow.utils.version import fetch_latest_version, get_version_info
from langflow.utils.version import is_pre_release as langflow_is_pre_release

app = typer.Typer(no_args_is_help=True)
console = Console()
if platform.system() == "Windows":
    console = Console(legacy_windows=True, emoji=False)  # Initialize console with Windows-safe settings
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())  # type: ignore[attr-defined]

# Add LFX commands as a sub-app
try:
    from lfx.cli.commands import serve_command
    from lfx.cli.run import run as lfx_run

    lfx_app = typer.Typer(name="lfx", help="Langflow Executor commands")
    lfx_app.command(name="serve", help="Serve a flow as an API", no_args_is_help=True)(serve_command)
    lfx_app.command(name="run", help="Run a flow directly", no_args_is_help=True)(lfx_run)

    app.add_typer(lfx_app, name="lfx")
except ImportError:
    # LFX not available, skip adding the sub-app
    pass


class ProcessManager:
    """Manages the lifecycle of the backend process."""

    def __init__(self):
        self.webapp_process = None
        self.shutdown_in_progress = False
        if platform.system() == "Windows":
            self._farewell_emoji = ":)"  # ASCII smiley
        else:
            self._farewell_emoji = "👋"  # Unicode wave

    # params are required for signal handlers, even if they are not used
    def handle_sigterm(self, _signum: int, _frame) -> None:
        """Handle SIGTERM signal gracefully."""
        if self.shutdown_in_progress:
            return  # Already shutting down, ignore
        self.shutdown_in_progress = True
        self.shutdown()

    # params are required for signal handlers, even if they are not used
    def handle_sigint(self, _signum: int, _frame) -> None:
        """Handle SIGINT signal gracefully."""
        if self.shutdown_in_progress:
            return  # Already shutting down, ignore
        self.shutdown_in_progress = True
        self.shutdown()

    def shutdown(self):
        """Gracefully shutdown the webapp process."""
        if self.webapp_process and self.webapp_process.is_alive():
            # Just terminate the process - the actual shutdown progress is handled
            # by the FastAPI lifespan context in main.py
            self.webapp_process.terminate()
            # The long wait allows the process to finish setup, preventing it from
            # getting in a state where background tasks continue to do work after termination
            # is sent.
            self.webapp_process.join(timeout=30)
            if self.webapp_process.is_alive():
                logger.warning("Process didn't terminate gracefully, killing it.")
                self.webapp_process.kill()
                self.webapp_process.join()
            self.print_farewell_message()

        sys.exit(0)

    def print_farewell_message(self) -> None:
        """Print a nice farewell message after shutdown is complete."""
        # Clear any progress indicator output that might be on the current line
        sys.stdout.write("\r")  # Move cursor to beginning of line
        sys.stdout.write(" " * 80)  # Clear the line with spaces
        sys.stdout.write("\r")  # Move cursor back to beginning

        click.echo()
        farewell = click.style(f"{self._farewell_emoji} See you next time!", fg="bright_blue", bold=True)
        click.echo(farewell)


# Create a single instance of ProcessManager
process_manager = ProcessManager()

# Update signal handlers to use the instance methods
signal.signal(signal.SIGTERM, process_manager.handle_sigterm)
signal.signal(signal.SIGINT, process_manager.handle_sigint)


def get_number_of_workers(workers=None):
    if workers == -1 or workers is None:
        workers = (cpu_count() * 2) + 1
    logger.debug(f"Number of workers: {workers}")
    return workers


def display_results(results) -> None:
    """Display the results of the migration."""
    for table_results in results:
        table = Table(title=f"Migration {table_results.table_name}")
        table.add_column("Name")
        table.add_column("Type")
        table.add_column("Status")

        for result in table_results.results:
            status = "Success" if result.success else "Failure"
            color = "green" if result.success else "red"
            table.add_row(result.name, result.type, f"[{color}]{status}[/{color}]")

        console.print(table)
        console.print()  # Print a new line


def set_var_for_macos_issue() -> None:
    # OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
    # we need to set this var is we are running on MacOS
    # otherwise we get an error when running gunicorn

    if platform.system() == "Darwin":
        import os

        os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
        # https://stackoverflow.com/questions/75747888/uwsgi-segmentation-fault-with-flask-python-app-behind-nginx-after-running-for-2 # noqa: E501
        os.environ["no_proxy"] = "*"  # to avoid error with gunicorn


def wait_for_server_ready(host, port, protocol) -> None:
    """Wait for the server to become ready by polling the health endpoint."""
    # Use localhost for health check when host is 0.0.0.0 (bind to all interfaces)
    health_check_host = "localhost" if host == "0.0.0.0" else host  # noqa: S104

    status_code = 0
    while status_code != httpx.codes.OK:
        try:
            status_code = httpx.get(
                f"{protocol}://{health_check_host}:{port}/health",
                verify=health_check_host not in ("127.0.0.1", "localhost"),
            ).status_code
        except HTTPError:
            time.sleep(1)
        except Exception:  # noqa: BLE001
            logger.debug("Error while waiting for the server to become ready.", exc_info=True)
            time.sleep(1)


@app.command()
def run(
    *,
    host: str | None = typer.Option(None, help="Host to bind the server to.", show_default=False),
    workers: int | None = typer.Option(None, help="Number of worker processes.", show_default=False),
    worker_timeout: int | None = typer.Option(None, help="Worker timeout in seconds.", show_default=False),
    port: int | None = typer.Option(None, help="Port to listen on.", show_default=False),
    components_path: Path | None = typer.Option(
        Path(__file__).parent / "components",
        help="Path to the directory containing custom components.",
        show_default=False,
    ),
    # .env file param
    env_file: Path | None = typer.Option(
        None,
        help="Path to the .env file containing environment variables.",
        show_default=False,
    ),
    log_level: str | None = typer.Option(
        None,
        help="Logging level. One of: [debug, info, warning, error, critical]. Defaults to info.",
        show_default=False,
    ),
    log_file: Path | None = typer.Option(None, help="Path to the log file.", show_default=False),
    log_rotation: str | None = typer.Option(None, help="Log rotation(Time/Size).", show_default=False),
    cache: str | None = typer.Option(  # noqa: ARG001
        None,
        help="Type of cache to use. (InMemoryCache, SQLiteCache)",
        show_default=False,
    ),
    dev: bool | None = typer.Option(None, help="Run in development mode (may contain bugs)", show_default=False),  # noqa: ARG001
    frontend_path: str | None = typer.Option(
        None,
        help="Path to the frontend directory containing build files. This is for development purposes only.",
        show_default=False,
    ),
    open_browser: bool | None = typer.Option(
        None,
        help="Open the browser after starting the server.",
        show_default=False,
    ),
    remove_api_keys: bool | None = typer.Option(  # noqa: ARG001
        None,
        help="Remove API keys from the projects saved in the database.",
        show_default=False,
    ),
    backend_only: bool | None = typer.Option(
        None,
        help="Run only the backend server without the frontend.",
        show_default=False,
    ),
    store: bool | None = typer.Option(  # noqa: ARG001
        None,
        help="Enables the store features.",
        show_default=False,
    ),
    auto_saving: bool | None = typer.Option(  # noqa: ARG001
        None,
        help="Defines if the auto save is enabled.",
        show_default=False,
    ),
    auto_saving_interval: int | None = typer.Option(  # noqa: ARG001
        None,
        help="Defines the debounce time for the auto save.",
        show_default=False,
    ),
    health_check_max_retries: bool | None = typer.Option(  # noqa: ARG001
        None,
        help="Defines the number of retries for the health check.",
        show_default=False,
    ),
    max_file_size_upload: int | None = typer.Option(  # noqa: ARG001
        None,
        help="Defines the maximum file size for the upload in MB.",
        show_default=False,
    ),
    webhook_polling_interval: int | None = typer.Option(  # noqa: ARG001
        None,
        help="Defines the polling interval for the webhook.",
        show_default=False,
    ),
    ssl_cert_file_path: str | None = typer.Option(
        None, help="Defines the SSL certificate file path.", show_default=False
    ),
    ssl_key_file_path: str | None = typer.Option(None, help="Defines the SSL key file path.", show_default=False),
) -> None:
    """Run Langflow."""
    if env_file:
        if is_settings_service_initialized():
            err = (
                "Settings service is already initialized. This indicates potential race conditions "
                "with settings initialization. Ensure the settings service is not created during "
                "module loading."
            )
            # i.e. ensures the env file is loaded before the settings service is initialized
            raise ValueError(err)
        load_dotenv(env_file, override=True)

    # Set and normalize log level, with precedence: cli > env > default
    log_level = (log_level or os.environ.get("LANGFLOW_LOG_LEVEL") or "info").lower()
    os.environ["LANGFLOW_LOG_LEVEL"] = log_level

    configure(log_level=log_level, log_file=log_file, log_rotation=log_rotation)

    # Create progress indicator (show verbose timing if log level is DEBUG)
    verbose = log_level == "debug"
    progress = create_langflow_progress(verbose=verbose)

    # Step 0: Initializing Langflow
    with progress.step(0):
        logger.debug(f"Loading config from file: '{env_file}'" if env_file else "No env_file provided.")
        set_var_for_macos_issue()
        settings_service = get_settings_service()

    # Step 1: Checking Environment
    with progress.step(1):
        for key, value in os.environ.items():
            new_key = key.replace("LANGFLOW_", "")
            if hasattr(settings_service.auth_settings, new_key):
                setattr(settings_service.auth_settings, new_key, value)

        frame = inspect.currentframe()
        valid_args: list = []
        values: dict = {}
        if frame is not None:
            arguments, _, _, values = inspect.getargvalues(frame)
            valid_args = [arg for arg in arguments if values[arg] is not None]

        for arg in valid_args:
            if arg == "components_path":
                settings_service.settings.update_settings(components_path=components_path)
            elif hasattr(settings_service.settings, arg):
                settings_service.set(arg, values[arg])
            elif hasattr(settings_service.auth_settings, arg):
                settings_service.auth_settings.set(arg, values[arg])
            logger.debug(f"Loading config from cli parameter '{arg}': '{values[arg]}'")

        # Get final values from settings
        host = settings_service.settings.host
        port = settings_service.settings.port
        workers = settings_service.settings.workers
        worker_timeout = settings_service.settings.worker_timeout
        log_level = settings_service.settings.log_level
        frontend_path = settings_service.settings.frontend_path
        backend_only = settings_service.settings.backend_only
        ssl_cert_file_path = (
            settings_service.settings.ssl_cert_file if ssl_cert_file_path is None else ssl_cert_file_path
        )
        ssl_key_file_path = settings_service.settings.ssl_key_file if ssl_key_file_path is None else ssl_key_file_path

        # create path object if frontend_path is provided
        static_files_dir: Path | None = Path(frontend_path) if frontend_path else None

    # Step 2: Starting Core Services
    with progress.step(2):
        app = setup_app(static_files_dir=static_files_dir, backend_only=bool(backend_only))

    # Step 3: Connecting Database (this happens inside setup_app via dependencies)
    with progress.step(3):
        # check if port is being used
        if is_port_in_use(port, host):
            port = get_free_port(port)

        # Store the runtime-detected port in settings (temporary until strict port enforcement)
        get_settings_service().settings.runtime_port = port

        protocol = "https" if ssl_cert_file_path and ssl_key_file_path else "http"

    # Step 4: Loading Components (placeholder for components loading)
    with progress.step(4):
        pass  # Components are loaded during app startup

    # Step 5: Adding Starter Projects (placeholder for starter projects)
    if get_settings_service().settings.create_starter_projects:
        with progress.step(5):
            pass  # Starter projects are added during app startup

    # Step 6: Launching Langflow
    if platform.system() == "Windows":
        with progress.step(6):
            import uvicorn

            # Print summary and banner before starting the server, since uvicorn is a blocking call.
            # We _may_ be able to subprocess, but with window's spawn behavior, we'd have to move all
            # non-picklable code to the subprocess.
            progress.print_summary()
            print_banner(str(host), int(port or 7860), protocol)

        # Blocking call, so must be outside of the progress step
        uvicorn.run(
            app,
            host=host,
            port=port,
            log_level=log_level,
            reload=False,
            workers=get_number_of_workers(workers),
            loop="asyncio",
        )
    else:
        with progress.step(6):
            # Use Gunicorn with LangflowUvicornWorker for non-Windows systems
            from langflow.server import LangflowApplication

            options = {
                "bind": f"{host}:{port}",
                "workers": get_number_of_workers(workers),
                "timeout": worker_timeout,
                "certfile": ssl_cert_file_path,
                "keyfile": ssl_key_file_path,
                "log_level": log_level.lower() if log_level is not None else "info",
            }
            server = LangflowApplication(app, options)

            # Start the webapp process
            process_manager.webapp_process = Process(target=server.run)
            process_manager.webapp_process.start()

            wait_for_server_ready(host, port, protocol)

        # Print summary and banner after server is ready
        progress.print_summary()
        print_banner(str(host), int(port or 7860), protocol)

        # Handle browser opening
        if open_browser and not backend_only:
            click.launch(f"{protocol}://{host}:{port}")

        try:
            process_manager.webapp_process.join()
        except KeyboardInterrupt:
            # SIGINT should be handled by the signal handler, but leaving here for safety
            logger.warning("KeyboardInterrupt caught in main thread")
        finally:
            process_manager.shutdown()


def is_port_in_use(port, host="localhost"):
    """Check if a port is in use.

    Args:
        port (int): The port number to check.
        host (str): The host to check the port on. Defaults to 'localhost'.

    Returns:
        bool: True if the port is in use, False otherwise.
    """
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        return s.connect_ex((host, port)) == 0


def get_free_port(port):
    """Given a used port, find a free port.

    Args:
        port (int): The port number to check.

    Returns:
        int: A free port number.
    """
    while is_port_in_use(port):
        port += 1
    return port


def is_loopback_address(host: str) -> bool:
    """Check if a host is a loopback address (localhost, 127.0.0.1, ::1, etc.).

    Args:
        host: The host address to check

    Returns:
        bool: True if the host is a loopback address, False otherwise
    """
    # Check if it's exactly "localhost"
    if host == "localhost":
        return True

    # Check if it's exactly "0.0.0.0" (which binds to all interfaces)
    if host == "0.0.0.0":  # noqa: S104
        return True

    try:
        # Convert string to IP address object
        ip = ip_address(host)
        # Check if it's a loopback address (127.0.0.0/8 for IPv4, ::1 for IPv6)
        return bool(ip.is_loopback)
    except ValueError:
        # If the IP address is invalid, default to False
        return False


def can_connect(host: str, port: int, timeout: float = 1.0) -> bool:
    try:
        for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
            family, socktype, proto, _, sa = res
            with socket.socket(family, socktype, proto) as s:
                s.settimeout(timeout)
                if s.connect_ex(sa) == 0:
                    return True
    except Exception as e:  # noqa: BLE001
        logger.exception("Failed to connect to %s:%s", host, port, exc_info=e)
    return False


def get_best_access_host(host: str, port: int) -> str:
    """Get the best host to use for accessing the server.

    For loopback addresses, we prefer 'localhost' over IP addresses like '127.0.0.1'
    because 'localhost' is more universally supported across different operating systems
    and network configurations.

    Args:
        host: The original host address
        port: The port number

    Returns:
        str: The best host address to use for access
    """
    if not is_loopback_address(host):
        return host

    if host != "localhost" and can_connect("localhost", port):
        return "localhost"
    if can_connect(host, port):
        return host
    return "localhost"


def get_letter_from_version(version: str) -> str | None:
    """Get the letter from a pre-release version."""
    if "a" in version:
        return "a"
    if "b" in version:
        return "b"
    if "rc" in version:
        return "rc"
    return None


def build_version_notice(current_version: str, package_name: str) -> str:
    """Build a version notice message if a newer version is available.

    This function checks if there is a newer version of the package available on PyPI
    and returns an appropriate notice message.

    Args:
        current_version (str): The currently installed version of the package
        package_name (str): The name of the package to check

    Returns:
        str: A notice message if a newer version is available, empty string otherwise.
            The message will indicate if the newer version is a pre-release.

    Example:
        >>> build_version_notice("1.0.0", "langflow")
        'A new version of langflow is available: 1.1.0'
    """
    with suppress(httpx.ConnectError):
        latest_version = fetch_latest_version(package_name, include_prerelease=langflow_is_pre_release(current_version))
        if latest_version and pkg_version.parse(current_version) < pkg_version.parse(latest_version):
            release_type = "pre-release" if langflow_is_pre_release(latest_version) else "version"
            return f"A new {release_type} of {package_name} is available: {latest_version}"
    return ""


def generate_pip_command(package_names, is_pre_release) -> str:
    """Generate the pip install command based on the packages and whether it's a pre-release."""
    base_command = "pip install"
    if is_pre_release:
        return f"{base_command} {' '.join(package_names)} -U --pre"
    return f"{base_command} {' '.join(package_names)} -U"


def stylize_text(text: str, to_style: str, *, is_prerelease: bool) -> str:
    color = "#42a7f5" if is_prerelease else "#6e42f5"
    # return "".join(f"[{color}]{char}[/]" for char in text)
    styled_text = f"[{color}]{to_style}[/]"
    return text.replace(to_style, styled_text)


def print_banner(host: str, port: int, protocol: str) -> None:
    notices = []
    package_names = []  # Track package names for pip install instructions
    is_pre_release = False  # Track if any package is a pre-release
    package_name = ""

    # Use langflow.utils.version to get the version info
    version_info = get_version_info()
    langflow_version = version_info["version"]
    package_name = version_info["package"]
    is_pre_release |= langflow_is_pre_release(langflow_version)  # Update pre-release status

    notice = build_version_notice(langflow_version, package_name)

    notice = stylize_text(notice, package_name, is_prerelease=is_pre_release)
    if notice:
        notices.append(notice)
    package_names.append(package_name)

    # Generate pip command based on the collected data
    pip_command = generate_pip_command(package_names, is_pre_release)

    # Add pip install command to notices if any package needs an update
    if notices:
        notices.append(f"Run '{pip_command}' to update.")

    [f"[bold]{notice}[/bold]" for notice in notices if notice]
    styled_package_name = stylize_text(
        package_name, package_name, is_prerelease=any("pre-release" in notice for notice in notices)
    )

    title = f"[bold]Welcome to {styled_package_name}[/bold]\n"

    # Use Windows-safe characters to prevent encoding issues
    import platform

    if platform.system() == "Windows":
        github_icon = "*"
        discord_icon = "#"
        arrow = "->"
        status_icon = "[OK]"
    else:
        github_icon = ":star2:"
        discord_icon = ":speech_balloon:"
        arrow = "→"
        status_icon = "🟢"

    info_text = (
        f"{github_icon} GitHub: Star for updates {arrow} https://github.com/langflow-ai/langflow\n"
        f"{discord_icon} Discord: Join for support {arrow} https://discord.com/invite/EqksyE2EX9"
    )
    telemetry_text = (
        (
            "We collect anonymous usage data to improve Langflow.\n"
            "To opt out, set: [bold]DO_NOT_TRACK=true[/bold] in your environment."
        )
        if os.getenv("DO_NOT_TRACK", os.getenv("LANGFLOW_DO_NOT_TRACK", "False")).lower() != "true"
        else (
            "We are [bold]not[/bold] collecting anonymous usage data to improve Langflow.\n"
            "To contribute, set: [bold]DO_NOT_TRACK=false[/bold] in your environment."
        )
    )
    access_host = get_best_access_host(host, port)
    access_link = f"[bold]{status_icon} Open Langflow {arrow}[/bold] [link={protocol}://{access_host}:{port}]{protocol}://{access_host}:{port}[/link]"

    message = f"{title}\n{info_text}\n\n{telemetry_text}\n\n{access_link}"

    # Handle Unicode encoding errors on Windows
    try:
        console.print()  # Add line break before banner
        console.print(Panel.fit(message, border_style="#7528FC", padding=(1, 2)))
    except UnicodeEncodeError:
        # Fallback to a simpler banner without emojis for Windows systems with encoding issues
        fallback_message = (
            f"Welcome to {package_name}\n\n"
            "* GitHub: https://github.com/langflow-ai/langflow\n"
            "# Discord: https://discord.com/invite/EqksyE2EX9\n\n"
            f"{telemetry_text}\n\n"
            f"[OK] Open Langflow -> {protocol}://{access_host}:{port}"
        )
        try:
            console.print()  # Add line break before fallback banner
            console.print(Panel.fit(fallback_message, border_style="#7528FC", padding=(1, 2)))
        except UnicodeEncodeError:
            # Last resort: use logger instead of print
            logger.info(f"Welcome to {package_name}")
            logger.info("GitHub: https://github.com/langflow-ai/langflow")
            logger.info("Discord: https://discord.com/invite/EqksyE2EX9")
            logger.info(f"Open Langflow: {protocol}://{access_host}:{port}")


@app.command()
def superuser(
    username: str = typer.Option(
        None, help="Username for the superuser. Defaults to 'langflow' when AUTO_LOGIN is enabled."
    ),
    password: str = typer.Option(
        None, help="Password for the superuser. Defaults to 'langflow' when AUTO_LOGIN is enabled."
    ),
    log_level: str = typer.Option("error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"),
    auth_token: str = typer.Option(
        None, help="Authentication token of existing superuser.", envvar="LANGFLOW_SUPERUSER_TOKEN"
    ),
) -> None:
    """Create a superuser.

    When AUTO_LOGIN is enabled, uses default credentials.
    In production mode, requires authentication.
    """
    configure(log_level=log_level)

    asyncio.run(_create_superuser(username, password, auth_token))


async def _create_superuser(username: str, password: str, auth_token: str | None):
    """Create a superuser."""
    await initialize_services()

    settings_service = get_settings_service()
    # Check if superuser creation via CLI is enabled
    if not settings_service.auth_settings.ENABLE_SUPERUSER_CLI:
        typer.echo("Error: Superuser creation via CLI is disabled.")
        typer.echo("Set LANGFLOW_ENABLE_SUPERUSER_CLI=true to enable this feature.")
        raise typer.Exit(1)

    if settings_service.auth_settings.AUTO_LOGIN:
        # Force default credentials for AUTO_LOGIN mode
        username = DEFAULT_SUPERUSER
        password = DEFAULT_SUPERUSER_PASSWORD.get_secret_value()
    else:
        # Production mode - prompt for credentials if not provided
        if not username:
            username = typer.prompt("Username")
        if not password:
            password = typer.prompt("Password", hide_input=True)

    from langflow.services.database.models.user.crud import get_all_superusers

    existing_superusers = []
    async with session_scope() as session:
        # Note that the default superuser is created by the initialize_services() function,
        # but leaving this check here in case we change that behavior
        existing_superusers = await get_all_superusers(session)
    is_first_setup = len(existing_superusers) == 0

    # If AUTO_LOGIN is true, only allow default superuser creation
    if settings_service.auth_settings.AUTO_LOGIN:
        if not is_first_setup:
            typer.echo("Error: Cannot create additional superusers when AUTO_LOGIN is enabled.")
            typer.echo("AUTO_LOGIN mode is for development with only the default superuser.")
            typer.echo("To create additional superusers:")
            typer.echo("1. Set LANGFLOW_AUTO_LOGIN=false")
            typer.echo("2. Run this command again with --auth-token")
            raise typer.Exit(1)

        typer.echo(f"AUTO_LOGIN enabled. Creating default superuser '{username}'...")
        # Do not echo the default password to avoid exposing it in logs.
    # AUTO_LOGIN is false - production mode
    elif is_first_setup:
        typer.echo("No superusers found. Creating first superuser...")
    else:
        # Authentication is required in production mode
        if not auth_token:
            typer.echo("Error: Creating a superuser requires authentication.")
            typer.echo("Please provide --auth-token with a valid superuser API key or JWT token.")
            typer.echo("To get a token, use: `uv run langflow api_key`")
            raise typer.Exit(1)

        # Validate the auth token
        try:
            auth_user = None
            async with session_scope() as session:
                # Try JWT first
                user = None
                try:
                    user = await get_current_user_by_jwt(auth_token, session)
                except (JWTError, HTTPException):
                    # Try API key
                    api_key_result = await check_key(session, auth_token)
                    if api_key_result and hasattr(api_key_result, "is_superuser"):
                        user = api_key_result
                auth_user = user

            if not auth_user or not auth_user.is_superuser:
                typer.echo(
                    "Error: Invalid token or insufficient privileges. Only superusers can create other superusers."
                )
                raise typer.Exit(1)
        except typer.Exit:
            raise  # Re-raise typer.Exit without wrapping
        except Exception as e:  # noqa: BLE001
            typer.echo(f"Error: Authentication failed - {e!s}")
            raise typer.Exit(1) from None

    # Auth complete, create the superuser
    async with session_scope() as session:
        from langflow.services.auth.utils import create_super_user

        if await create_super_user(db=session, username=username, password=password):
            # Verify that the superuser was created
            from langflow.services.database.models.user.model import User

            stmt = select(User).where(User.username == username)
            created_user: User = (await session.exec(stmt)).first()
            if created_user is None or not created_user.is_superuser:
                typer.echo("Superuser creation failed.")
                return
            # Now create the first folder for the user
            result = await get_or_create_default_folder(session, created_user.id)
            if result:
                typer.echo("Default folder created successfully.")
            else:
                msg = "Could not create default folder."
                raise RuntimeError(msg)

            # Log the superuser creation for audit purposes
            logger.warning(
                f"SECURITY AUDIT: New superuser '{username}' created via CLI command"
                + (" by authenticated user" if auth_token else " (first-time setup)")
            )
            typer.echo("Superuser created successfully.")

        else:
            logger.error(f"SECURITY AUDIT: Failed attempt to create superuser '{username}' via CLI")
            typer.echo("Superuser creation failed.")


# command to copy the langflow database from the cache to the current directory
# because now the database is stored per installation
@app.command()
def copy_db() -> None:
    """Copy the database files to the current directory.

    This function copies the 'langflow.db' and 'langflow-pre.db' files from the cache directory to the current
    directory.
    If the files exist in the cache directory, they will be copied to the same directory as this script (__main__.py).

    Returns:
        None
    """
    import shutil

    from platformdirs import user_cache_dir

    cache_dir = Path(user_cache_dir("langflow"))
    db_path = cache_dir / "langflow.db"
    pre_db_path = cache_dir / "langflow-pre.db"
    # It should be copied to the current directory
    # this file is __main__.py and it should be in the same directory as the database
    destination_folder = Path(__file__).parent
    if db_path.exists():
        shutil.copy(db_path, destination_folder)
        typer.echo(f"Database copied to {destination_folder}")
    else:
        typer.echo("Database not found in the cache directory.")
    if pre_db_path.exists():
        shutil.copy(pre_db_path, destination_folder)
        typer.echo(f"Pre-release database copied to {destination_folder}")
    else:
        typer.echo("Pre-release database not found in the cache directory.")


async def _migration(*, test: bool, fix: bool) -> None:
    await initialize_services(fix_migration=fix)
    db_service = get_db_service()
    if not test:
        await db_service.run_migrations()
    results = await db_service.run_migrations_test()
    display_results(results)


@app.command()
def migration(
    test: bool = typer.Option(default=True, help="Run migrations in test mode."),  # noqa: FBT001
    fix: bool = typer.Option(  # noqa: FBT001
        default=False,
        help="Fix migrations. This is a destructive operation, and should only be used if you know what you are doing.",
    ),
) -> None:
    """Run or test migrations."""
    if fix and not typer.confirm(
        "This will delete all data necessary to fix migrations. Are you sure you want to continue?"
    ):
        raise typer.Abort

    asyncio.run(_migration(test=test, fix=fix))


@app.command()
def api_key(
    log_level: str = typer.Option("error", help="Logging level."),
) -> None:
    """Creates an API key for the default superuser if AUTO_LOGIN is enabled.

    Args:
        log_level (str, optional): Logging level. Defaults to "error".

    Returns:
        None
    """
    configure(log_level=log_level)

    async def aapi_key():
        await initialize_services()
        settings_service = get_settings_service()
        auth_settings = settings_service.auth_settings
        if not auth_settings.AUTO_LOGIN:
            # TODO: Allow non-auto-login users to create API keys via CLI
            typer.echo("Auto login is disabled. API keys cannot be created through the CLI.")
            return None

        async with session_scope() as session:
            from langflow.services.database.models.user.model import User

            stmt = select(User).where(User.username == DEFAULT_SUPERUSER)
            superuser = (await session.exec(stmt)).first()
            if not superuser:
                typer.echo(
                    "Default superuser not found. This command requires a superuser and AUTO_LOGIN to be enabled."
                )
                return None
            from langflow.services.database.models.api_key.crud import create_api_key, delete_api_key
            from langflow.services.database.models.api_key.model import ApiKey, ApiKeyCreate

            stmt = select(ApiKey).where(ApiKey.user_id == superuser.id)
            api_key = (await session.exec(stmt)).first()
            if api_key:
                await delete_api_key(session, api_key.id)

            api_key_create = ApiKeyCreate(name="CLI")
            return await create_api_key(session, api_key_create, user_id=superuser.id)

    unmasked_api_key = asyncio.run(aapi_key())
    # Create a banner to display the API key and tell the user it won't be shown again
    if unmasked_api_key:
        api_key_banner(unmasked_api_key)


def show_version(*, value: bool):
    if value:
        default = "DEV"
        raw_info = get_version_info()
        version = raw_info.get("version", default) if raw_info else default
        typer.echo(f"langflow {version}")
        raise typer.Exit


@app.callback()
def version_option(
    *,
    version: bool = typer.Option(
        None,
        "--version",
        "-v",
        callback=show_version,
        is_eager=True,
        help="Show the version and exit.",
    ),
):
    pass


def api_key_banner(unmasked_api_key) -> None:
    is_mac = platform.system() == "Darwin"
    import pyperclip

    pyperclip.copy(unmasked_api_key.api_key)
    panel = Panel(
        f"[bold]API Key Created Successfully:[/bold]\n\n"
        f"[bold blue]{unmasked_api_key.api_key}[/bold blue]\n\n"
        "This is the only time the API key will be displayed. \n"
        "Make sure to store it in a secure location. \n\n"
        f"The API key has been copied to your clipboard. [bold]{['Ctrl', 'Cmd'][is_mac]} + V[/bold] to paste it.",
        box=box.ROUNDED,
        border_style="blue",
        expand=False,
    )
    # Use Windows-safe console initialization
    banner_console = Console(legacy_windows=True, emoji=False) if platform.system() == "Windows" else Console()

    try:
        banner_console.print(panel)
    except UnicodeEncodeError:
        # Fallback for Windows encoding issues
        logger.info("API Key Created Successfully:")
        logger.info(unmasked_api_key.api_key)
        logger.info("This is the only time the API key will be displayed.")
        logger.info("Make sure to store it in a secure location.")
        ctrl_cmd = "Ctrl" if not is_mac else "Cmd"
        logger.info(f"The API key has been copied to your clipboard. {ctrl_cmd} + V to paste it.")


def main() -> None:
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        app()


if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        logger.exception(e)
        raise typer.Exit(1) from e
